Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for schema evolution #329

Merged
merged 28 commits into from
Jan 12, 2024

Conversation

redaLaanait
Copy link
Contributor

@redaLaanait redaLaanait commented Nov 13, 2023

The main idea is to generate a composite schema by combining both read and write schemas:

  • The composite schema essentially contains an optional field-level attribute field.action, which can take on one of two values: DRAIN or SET_DEFAULT.

  • field.action is considered during the record decoding process, where it is used to skip reading the encoded value or force setting the default value.

  • To set the default value, I added a createDefaultDecoder function that deals with different schema types and ensures the conversion of the default value.

  • Also, I made a change to SchemaCompatibility.Compatible function to consider field aliases.

fixes #20

Copy link
Member

@nrwiersma nrwiersma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a stab at this. A few comments from a cursory review.

codec_default.go Outdated Show resolved Hide resolved
codec_record.go Outdated Show resolved Hide resolved
schema_compatibility.go Show resolved Hide resolved
schema_compatibility.go Outdated Show resolved Hide resolved
schema_compatibility.go Outdated Show resolved Hide resolved
@redaLaanait
Copy link
Contributor Author

Note: I can keep working on this task unless you want to take charge.

@redaLaanait redaLaanait marked this pull request as draft November 14, 2023 23:10
@redaLaanait
Copy link
Contributor Author

I tried the following approach to handle type's promotion:

  • added actual Type field to PrimitiveSchema
  • Introduced codecPromoter (used by some native decoders) and readerPromoter (a reader wrapper used by *Reader.readNext method)

The draft uses reflect.ValueOf on a few occasions... I wonder if this goes against the initial idea of using the reflect2 package.

@redaLaanait redaLaanait force-pushed the feat/schema_evolution branch from 10b955d to 4ade51d Compare November 17, 2023 18:58
@nrwiersma
Copy link
Member

Its an interesting approach. Will look at it in more detail this weekend. First impression is that it is quiet verbose, but I dont have a better idea at this point, so this is already great work.

@redaLaanait
Copy link
Contributor Author

Thanks for the quick feedback!

@nrwiersma
Copy link
Member

So i had some time to take a look at the last commits. First off, very nice progress here. It is not as verbose as I had previously thought. One thing I have noticed is that everything happens at runtime, which will carry a heavy performance penalty. However, we have all the information needed to move this from a runtime problem to a planning problem.

My suggestion is to change the codecs to use a specific "conversion" function, for example:

type longCodec[T largeInt] struct {
	convert func(*Reader) int64
}

func (c *longCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
	*((*T)(ptr)) = T(c.convert(r))
}

This both cleans up the functions a little, and means we know up front what this function looks like. It will still take a performance hit from calling the indirect function, but I suspect this should be quiet small, but will need to be benchmarked. For string <-> byte conversion, there are unsafe ways to make this alloc-less.

The converter creators could look something like:

func createLongConverter(typ Type) (func(*Reader) int64, error) {
	switch typ {
	case Int:
		return func(r *Reader) int64 { return int64(r.ReadInt()) }, nil
	case Long:
		return func(r *Reader) int64 { return r.ReadLong() }, nil
	default:
		return nil, fmt.Errorf("cannot promote from %q to %q", typ, Long)
	}
}

Leaving the planning to look something like this:

case reflect.Uint32:
    if schema.Type() != Long {
	    break
    }
    convert, err := createLongConverter(actual)
    if err != nil {
	    return &errorDecoder{err: fmt.Errorf("avro: %w", err)}
    }
    return &longCodec[uint32]{convert: convert}

Thoughts?

@redaLaanait
Copy link
Contributor Author

Your solution looks cleaner and doesn’t use reflect.ValueOf, so as you said, It may have better performance.

Regarding the actual field, it might be empty. In this case, we might need to make the convert func(*Reader) int64 function optional and handle its nil value in the decode function call. (I'm not sure if this is the right approach from a performance optimization point of view).

Another point (a bit opinionated) I was thinking about is that codec natives decoders and Reader.readNext seem not orthogonal; they implement similar logic/knowledge, which leads to duplicating the new type promotion logic on both sides. IMO, it would be nice to find a way to reuse codec natives decoders and, somehow, replace/refactor Reader.readNext.

@nrwiersma
Copy link
Member

Regarding the actual field, it might be empty.

Fair point. Either the nil value can be handled, or the empty actual can be handled as a non-conversion. Both work.

IMO, it would be nice to find a way to reuse codec natives decoders and, somehow, replace/refactor Reader.readNext.

I tend to disagree. The Reader is responsible for the binary decoding of the Avro data, where the codecs are responsible for decoding the Schema, as it were, into Go types. This separation makes each part much simpler and reusable. As promotion lives in a Schema level rather than a binary level, the changes should happen at the codec level. The codec however has 2 parts: 1) reading the correct binary data from the Reader, and 2) converting to the correct Go type, and this is what we see in the native promotion code.

@redaLaanait
Copy link
Contributor Author

My point is specific to readNext method of the Reader. I think it does something related to:

  1. converting to the correct Go type

@nrwiersma
Copy link
Member

Ah, I misunderstood. You are correct here. Generic reading should be in the codec level. It would be more consistent.

@redaLaanait
Copy link
Contributor Author

I’ll update the draft first to consider your proposed approach, then figure out how to handle the readNext case.

@redaLaanait redaLaanait force-pushed the feat/schema_evolution branch 3 times, most recently from 1877e31 to e89e8c8 Compare November 25, 2023 17:21
@redaLaanait redaLaanait force-pushed the feat/schema_evolution branch from e89e8c8 to 845d478 Compare November 25, 2023 17:31
@redaLaanait
Copy link
Contributor Author

I integrated the "convert function idea" in my draft and ended up moving away from "moving readNext logic to codec-level" to keep the focus on schema evolution.

If the type promotion handling design looks good, then the remaining central point to fix should be handling default value at the decoder-level:

I made some cleanups and slightly reduced verbosity by considering that the validateDefault function does normalize the default value type.

Any thoughts or suggestions for improvement or a better solution are welcome!

@redaLaanait
Copy link
Contributor Author

To avoid duplicating the type's promotion and default value logic in case of dynamic decoder, I tried to replace Reader.ReadNext by the following:

Replace:

pObj := (*any)(ptr)
obj := *pObj
*pObj = r.ReadNext(d.schema)

With:

pObj := (*any)(ptr)
obj := *pObj
receiverPtr, receiverTyp, err := dynamicReceiver(d.schema, r.cfg.resolver)
if err != nil {
	r.ReportError("Read", err.Error())
	return
}
decoderOfType(r.cfg, d.schema, receiverTyp).Decode(receiverPtr, r)
*pObj = receiverTyp.UnsafeIndirect(receiverPtr)

Benchmark:

ReadNext

BenchmarkDecoderInterface/Empty_Interface-8                41175             29783 ns/op           16453 B/op        253 allocs/op
BenchmarkDecoderInterface/Interface_Non-Ptr-8              41558             29821 ns/op           16453 B/op        253 allocs/op
BenchmarkDecoderInterface/Interface_Nil_Ptr-8              41488             28455 ns/op           16101 B/op        250 allocs/op
BenchmarkDecoderInterface/Interface_Ptr-8                  41211             28406 ns/op           16101 B/op        250 allocs/op

Dynamic Receiver

BenchmarkDecoderInterface/Empty_Interface-8                34342             33980 ns/op           17259 B/op        287 allocs/op
BenchmarkDecoderInterface/Interface_Non-Ptr-8              34971             33879 ns/op           17259 B/op        287 allocs/op
BenchmarkDecoderInterface/Interface_Nil_Ptr-8              41997             27843 ns/op           16101 B/op        250 allocs/op
BenchmarkDecoderInterface/Interface_Ptr-8                  42182             27798 ns/op           16101 B/op        250 allocs/op

schema.go Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
@redaLaanait redaLaanait force-pushed the feat/schema_evolution branch from b8aac9b to 84bdd49 Compare December 11, 2023 19:07
@redaLaanait redaLaanait force-pushed the feat/schema_evolution branch from c17c6aa to 7db00da Compare December 17, 2023 17:14
@redaLaanait
Copy link
Contributor Author

The draft looks good to me and can be turned into PR.

One point I didn't receive your feedback about is the replacement of readNext at the codec level.
It's implemented in codec_generic.go and runs against the same test suite as reader_generic.go

@redaLaanait redaLaanait marked this pull request as ready for review December 21, 2023 18:26
@redaLaanait redaLaanait changed the title add support for schema evolution (POC) add support for schema evolution Dec 21, 2023
@nrwiersma
Copy link
Member

One point I didn't receive your feedback about is the replacement of readNext at the codec level. It's implemented in codec_generic.go and runs against the same test suite as reader_generic.go

I see the need here, because of the compatibility changes. I would prefer it in a separate PR though, just to reduce the change scope a little, and make it simpler to review.

Copy link
Member

@nrwiersma nrwiersma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First off, great work 🎉 Sorry for the delay, I have been away.
I am doing this review in stages as I have time, as it is quiet large.
This is the first round.

schema.go Outdated Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
resolver.go Outdated Show resolved Hide resolved
@redaLaanait
Copy link
Contributor Author

Take your time to review it, and be merciless 🙂. It’s a large PR where errors could slip through.

schema.go Outdated Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
schema.go Outdated Show resolved Hide resolved
converter.go Outdated Show resolved Hide resolved
codec_default.go Show resolved Hide resolved
codec_default.go Outdated Show resolved Hide resolved
Copy link
Member

@nrwiersma nrwiersma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🎉

@nrwiersma nrwiersma merged commit 589f785 into hamba:main Jan 12, 2024
2 of 3 checks passed
@redaLaanait
Copy link
Contributor Author

I think this is an issue. The reader will not reset on the second read,

I misused borrowReader, but It was strange that my tests didn't catch this issue.

That's because the generic decoding bypasses the cache and instantiates a new decoder in each call: efaceDecoder.Decode -> genericDecode -> decoderOfType

I'm not sure If I should do something in this regard.

@nrwiersma
Copy link
Member

I am not super stressed. There is a lot in the lib that is difficult if not impossible to test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Support for Schema Evolution
2 participants